/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.deploy.component.instance.meta;

import com.chinamobile.cmss.lakehouse.common.kubernetes.K8sModelConstant;
import com.chinamobile.cmss.lakehouse.core.config.ComponentConfiguration;
import com.chinamobile.cmss.lakehouse.core.config.KubernetesConfiguration;
import com.chinamobile.cmss.lakehouse.core.config.SimpleHadoopConfiguration;
import com.chinamobile.cmss.lakehouse.core.deploy.component.descriptor.ComponentDescriptor;
import com.chinamobile.cmss.lakehouse.core.handler.K8sDeployHandler;
import com.chinamobile.cmss.lakehouse.core.handler.K8sServiceHandler;
import com.chinamobile.cmss.lakehouse.core.handler.NodePortHandler;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.kubernetes.client.custom.IntOrString;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServicePort;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MetaComponentDescriptor extends ComponentDescriptor {

    private static final int META_SERVER_SERVICE_PORT = 9083;
    private static final int META_PORT_COUNT = 1;

    private static final String META_CONFIG_MAP_NAME = "meta-config";
    private static final String META_STATEFUL_SET_NAME = "meta-sts";
    private static final String META_HEADLESS_SERVICE = "meta-headless";
    private static final String META_NODEPORT_SERVICE = "meta-nodeport";
    private static final String META_RPC_PORT_NAME = "rpc";

    @Autowired
    protected K8sDeployHandler deployHandler;

    @Autowired
    protected NodePortHandler nodePortHandler;

    public Optional<V1ConfigMap> buildMetaConfigMap(ComponentConfiguration config)
        throws IOException {
        // get hadoop info
        SimpleHadoopConfiguration hiveSiteContent = KubernetesConfiguration.getMetaSiteContent();

        // update metastore info
        hiveSiteContent.set(MetaComponentOptions.META_MYSQL_ADDRESS.key(),
            config.getString(MetaComponentOptions.META_MYSQL_ADDRESS));
        hiveSiteContent.set(MetaComponentOptions.META_MYSQL_USER.key(),
            config.getString(MetaComponentOptions.META_MYSQL_USER));
        hiveSiteContent.set(MetaComponentOptions.META_MYSQL_PASS.key(),
            config.getString(MetaComponentOptions.META_MYSQL_PASS));
        hiveSiteContent.set(MetaComponentOptions.META_WAREHOUSE_DIR.key(),
            config.getString(MetaComponentOptions.META_WAREHOUSE_DIR));
        hiveSiteContent.set(MetaComponentOptions.META_DEFAULT_FS.key(),
            config.getString(MetaComponentOptions.META_DEFAULT_FS));
        hiveSiteContent.set(MetaComponentOptions.META_S3_ENDPOINT.key(),
            config.getString(MetaComponentOptions.META_S3_ENDPOINT));
        hiveSiteContent.set(MetaComponentOptions.META_S3_ACCESS_KEY.key(),
            config.getString(MetaComponentOptions.META_S3_ACCESS_KEY));
        hiveSiteContent.set(MetaComponentOptions.META_S3_SECRET_KEY.key(),
            config.getString(MetaComponentOptions.META_S3_SECRET_KEY));

        V1ConfigMap configMap = parseConfigMap(KubernetesConfiguration.getMetaYamlTemplate());

        // set name
        configMap.getMetadata().setName(META_CONFIG_MAP_NAME);

        // set data
        Map<String, String> configMapData = configMap.getData();
        configMapData.put("meta-site.xml", hiveSiteContent.configContent());

        return Optional.of(configMap);
    }

    @Override
    public Optional<List<V1ConfigMap>> buildConfigMap(ComponentConfiguration config)
        throws IOException {

        List<V1ConfigMap> configMapList = Lists.newArrayList();
        buildMetaConfigMap(config).ifPresent(configMapList::add);
        return Optional.of(configMapList);
    }

    @Override
    public Optional<List<V1Service>> buildService(ComponentConfiguration config) throws IOException {
        V1Service headlessSvc = parseService(KubernetesConfiguration.getMetaYamlTemplate());

        // set headless service name
        headlessSvc.getMetadata().setName(META_HEADLESS_SERVICE);

        List<V1Service> services = Lists.newArrayList();
        services.add(headlessSvc);

        if (KubernetesConfiguration.exposeServiceEnabled) {
            services.addAll(buildNodePortService());
        }

        return Optional.of(services);
    }

    private List<V1Service> buildNodePortService() {
        List<V1Service> nodePortServices = Lists.newArrayList();
        // acquire node ports, only for ipv4
        int numGroups = 1;
        int requiredPortCount = numGroups * META_PORT_COUNT;
        List<Integer> nodePorts =
            nodePortHandler.acquireNodePorts(K8sModelConstant.DEFAULT_NAMESPACE, requiredPortCount);

        for (int index = 0; index < numGroups; index++) {
            // set labels
            Map<String, String> nodePortSvcMap = Maps.newHashMap();
            nodePortSvcMap.putAll(K8sModelConstant.META_SERVICE_LABELS);

            // set ports
            List<V1ServicePort> ports = Lists.newArrayList();
            V1ServicePort rpcPort =
                new V1ServicePort().name(META_RPC_PORT_NAME).port(nodePorts.get(index))
                    .nodePort(nodePorts.get(index)).targetPort(new IntOrString(META_SERVER_SERVICE_PORT));
            ports.add(rpcPort);

            V1Service nodePortSvc = new V1Service().apiVersion(K8sModelConstant.API_VERSION)
                .kind(K8sModelConstant.SERVICE_KIND)
                .metadata(new V1ObjectMetaBuilder().withName(META_NODEPORT_SERVICE)
                    .withLabels(nodePortSvcMap).build())
                .spec(new V1ServiceSpec().ports(ports)
                    .selector(K8sModelConstant.META_SERVICE_LABELS)
                    .type(K8sModelConstant.NODE_PORT));

            nodePortServices.add(nodePortSvc);
        }

        return nodePortServices;
    }

    @Override
    public Optional<V1StatefulSet> buildStatefulSet(ComponentConfiguration config)
        throws IOException {
        int replica = config.getInteger(MetaComponentOptions.META_REPLICA);

        V1StatefulSet statefulSet = parseStatefulSet(KubernetesConfiguration.getMetaYamlTemplate());

        // set replica
        statefulSet.getSpec().setReplicas(replica);

        // set headless service
        statefulSet.getSpec().setServiceName(META_HEADLESS_SERVICE);

        // update image
        statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0)
            .image(KubernetesConfiguration.hiveImage);

        // update node selector
        statefulSet.getSpec().getTemplate().getSpec().nodeSelector(KubernetesConfiguration.getNodeSelector());

        // update resource
        double memExtended = KubernetesConfiguration.metaHeap * KubernetesConfiguration.memoryOverRatio;
        updateStatefulSetResources(statefulSet, KubernetesConfiguration.cpuOverRatio,
            KubernetesConfiguration.containerVcore, memExtended);

        // update args
        int jvmHeapSize = new Double(KubernetesConfiguration.metaHeap * 1024).intValue();
        String oldArgs =
            statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getArgs().get(0);
        String newArgs = oldArgs.replaceAll(K8sModelConstant.JVM_HEAP_SIZE_HOLDER,
            String.format("-Xmx%dm", jvmHeapSize));
        updateStatefulSetArgs(statefulSet, newArgs);

        return Optional.of(statefulSet);
    }

    @Override
    public Optional<V1Deployment> buildDeployment(ComponentConfiguration config) throws IOException {
        return Optional.empty();
    }

    @Override
    public String k8sInternalService(String namespace) {
        return K8sServiceHandler.componentDomainService(namespace, META_HEADLESS_SERVICE,
            META_SERVER_SERVICE_PORT);
    }

    @Override
    public String k8sExternalService(String namespace) {
        return deployHandler.getExternalUrlFromService(namespace, META_NODEPORT_SERVICE, META_RPC_PORT_NAME);
    }

    public boolean isRunning(String namespace) {
        return deployHandler.checkComponentHealthStatus(namespace, META_STATEFUL_SET_NAME);
    }

    public boolean isCommonResourceExisting(String namespace) {
        return deployHandler.checkCommonResourceExisting(namespace, META_CONFIG_MAP_NAME,
            META_HEADLESS_SERVICE, META_STATEFUL_SET_NAME);
    }

}
